backgroundTask

fastapi内置的后台执行任务的方法

  • 需要一定等待时间,比如发送邮件
  • 对响应的没有影响,比如处理文件

使用案例:


def write_notification(email: str, message=""):

    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    # 添加的任务,极其参数
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "信息已经在后台任务中处理"}

依赖注入方式

from typing import Optional
from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Optional[str] = None):
if q:
  message = f"found query: {q}\n" background_tasks.add_task(write_log, message)
return q


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)):
    message = f"message to {email}\n" background_tasks.add_task(write_log, message)
return {"message": "Message sent"}